{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Introduction\n",
    "\n",
    "This notebook demonstrates using GeoMesa FileSystem with Apache Spark in Scala, reading data stored in Azure Blob Storage.\n",
    "\n",
    "In this fictional scenarios, we want to find all vessels which have been in proximity to our notional vessel of interest."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Imports we'll need\n",
    "import org.apache.spark.sql.SQLTypes\n",
    "import org.apache.spark.sql._\n",
    "import org.apache.spark.sql.functions._\n",
    "import org.locationtech.geomesa.spark.jts._\n",
    "\n",
    "// Register all udfs\n",
    "SQLTypes.init(spark.sqlContext) \n",
    "\n",
    "println(sc.version)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Load & filter data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Load overall AIS dataframe \n",
    "// TODO: replace appropriate values\n",
    "val df = spark.read\n",
    "  .format(\"geomesa\")\n",
    "  .option(\"fs.path\",\"wasbs://<blob container name>@<storage account>.blob.core.windows.net/<path>\")\n",
    "  .option(\"geomesa.feature\", \"marinecadastre-ais-csv\")\n",
    "  .load()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Show the schema\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Total number of records\n",
    "df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%Truncation off\n",
    "\n",
    "// Filter for ships within 50km of Galveston on 7th July 2017 and show the query plan\n",
    "val lon = -95.013398\n",
    "val lat = 29.2335042\n",
    "val dist = 50000\n",
    "val ships = df\n",
    "    .where(st_within($\"geom\", st_bufferPoint(st_makePoint(lon, lat), dist)))  \n",
    "    .where($\"BaseDateTime\" > from_utc_timestamp(lit(\"2017-07-07 00:00:00\"), \"Z\"))  \n",
    "    .where($\"BaseDateTime\" < from_utc_timestamp(lit(\"2017-07-07 23:59:59\"), \"Z\"))\n",
    "ships.explain(true)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Cache for performance \n",
    "ships.cache()\n",
    "ships.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%dataframe --limit 1000\n",
    "ships"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Find Vessel of Interest"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![vessel](https://photos.marinetraffic.com/ais/showphoto.aspx?photoid=3473160 \"Yellow Rose\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Filter for specific vessel of interest\n",
    "val name = \"YELLOW ROSE\"\n",
    "val interesting = ships.where($\"VesselName\" === name)\n",
    "interesting.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Plot where vessel of interest has been\n",
    "\n",
    "import org.locationtech.geomesa.jupyter._\n",
    "\n",
    "val voi = L.DataFrameLayerPoint(interesting, \"__fid__\", L.StyleOptions(\"#000000\", \"#FF0000\", 0.50))\n",
    "val osm = L.WMSLayer(\"osm_auto:all\", geoserverURL = \"https://maps.heigit.org/osm-wms/service/\")\n",
    "val aoi = L.Circle(lon, lat, dist, L.StyleOptions(\"#000000\", \"#FFFF00\", 0.15))\n",
    "\n",
    "kernel.display.html(L.render(Seq(osm, aoi, voi), (lat, lon), 8))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Look for \"suspicious\" vessels"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Quantise locations of voi using geohashes\n",
    "val precision = 35\n",
    "val interestingGhs = interesting\n",
    "    .select(st_geoHash($\"geom\", precision).as(\"gh\"), $\"VesselName\")\n",
    "    \n",
    "\n",
    "// Similarly, quantise locations of all ships\n",
    "val shipsGhs = ships\n",
    "    .select(st_geoHash($\"geom\", precision).as(\"gh\"), $\"VesselName\")\n",
    "\n",
    "// Count occurrences of ships in proximity to our voi\n",
    "// NOTE: have to use .as(...) since they have the same schema so column names clash\n",
    "// TODO: should also add a time criterion\n",
    "val suspects = shipsGhs.repartition(20).as(\"ships\")\n",
    "    .join(broadcast(interestingGhs.as(\"interesting\")))\n",
    "    .where($\"ships.gh\" === $\"interesting.gh\")\n",
    "    .where($\"ships.VesselName\" =!= $\"interesting.VesselName\")  // don't want to include self\n",
    "    .groupBy($\"ships.VesselName\")\n",
    "    .count()\n",
    "    .orderBy(desc(\"count\"))\n",
    "\n",
    "// For more precision, use something like this:\n",
    "//    .where(st_within($\"ships.geom\", st_bufferPoint($\"interesting.geom\", near)))\n",
    "// or st_distanceSpheroid\n",
    "// But need to look carefully at partitioning to avoid O(n^2) in the general case"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%dataframe --limit 100\n",
    "suspects"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
